NginxのログをLambda, Kinesis Streams を使って MongoDB Atlus に保存 – ClassmethodサーバーレスAdvent Calendar 2017 #serverless #adventcalendar
こんにちわ。西田@大阪です。このエントリはServerless Advent Calendar 2017 6日目の記事です
この記事ではNginxのログを Kinesis Streamsで収集しLambdaを使ってMongoDB Atlusに保存するところまでやってみます
MongoDB Atlusとは?
MongoDB社が提供している、MongoDB のフルマネージドサービスです
MongoDB as a Service:MongoDB Atlasを試してみた | Developers.IO
MongoDB で出来ること
せっかくなのでMongoDBのお気に入りの機能の一つ、配列のフィールドにインデックスを貼れるMultikey IndexesでHTTP Statusに応じて succeed
やfailure
と検索に使えるタグもつけたいと思います
構成
EC2上に動いてるNginxのログをKinesis Agentを使ってKinesis Streamsに書いていき、Lambdaを使ってMongoDB Atlasにデータを保存していきます
事前準備
MongoDB Atlusにサインアップ
MongoDB as a Service:MongoDB Atlasを試してみた
上記を参考に MongoDB Atlus のにサインアップしクラスタを作成します
今回は無料プランを使いたかったのでAWSのバージニアリージョンに作成しました
Kinesis Streamsをセットアップ
任意の名前とシャード数を設定しKinesis Streamsを作成します
KMSの鍵を作成
MongoDB Atlasへの接続情報を暗号化してLambdaの環境変数に設定するためKMSの鍵を作成します
EC2からKinesis Streamsに送信する準備
EC2上のNginxのログをKinesis Streamsに送信するための準備を行います
EC2のインスタンスプロファイルにKinesis, CloudWatchの適切なIAMを設定します
EC2上のNginxのログフォーマットを設定します。今回は簡単のためにタグ区切りで最低限の情報をログに出力させるようにしました
- /etc/nginx/nginx.conf
log_format main "$msec\t$remote_addr\t$status\t$request\t$http_user_agent";
Kinesis Agent はNginxのタグ区切りで出力されるaccess logをJSONとしてKinesis Streamsに送信するよう設定します
- /etc/aws-kinesis/agent.json
{ "cloudwatch.emitMetrics": true, "kinesis.endpoint": "", "firehose.endpoint": "", "flows": [ { // 送信する対象のパス "filePattern": "/var/log/nginx/access.log*", // Kinesis Streams の名前 "kinesisStream": "mongodb-sample", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", // JSONのフィールド名を指定。配列の順番とタブで区切られるログの値と対応させる "customFieldNames": [ "time", "remote_addr", "status", "method", "user_agent" ], // 区切り文字を指定 "delimiter": "\\t" } ], "partitionKeyOption": "RANDOM" } ] }
Lamdbaを設定する
MongoDB Atlas への接続情報を確認します
クラスタの管理画面から接続に必要なmongodb://
から始まる接続文字列とMongo Shellで接続するためのコマンドをメモします
MongoDBのIndexを準備する
タグ検索のためのIndexを準備します
こういった操作がSQLでなくJavaScriptで簡単に操作出来るのもMongoDBの特徴のひとつです
mongo
コマンドがインストール済みでない場合はインストールする必要があります(下記はMacの例です)
brew install mongodb --with-openssl
- indexを作成します
mongo "mongodb://cluster0-xxx.mongodb.net:27017,cluster0-xxx.mongodb.net:27017,cluster0-xxx.mongodb.net:27017/test?replicaSet=Cluster0-shard-0" --authenticationDatabase admin --ssl --username mongosample --password 設定したパスワード Cluster:PRIMARY> db.createCollection('logs') Cluster:PRIMARY> db.getCollection('logs').createIndex({tags: 1})
Lambda
事前準備
- Kinesis、CloudWatch、KMSなどのIAMを設定する
MONGODB_ATLAS_CLUSTER_URI
という名前で環境変数にMongodb Atlasへの接続情報を設定しKMSで暗号化します
ソース
こちらを参考にさせていただきました。
Node.js, AWS Lambda and MongoDB Atlas - Code Tutorial | MongoDB
- ハンドラー
const MongoClient = require('mongodb').MongoClient; const AWS = require('aws-sdk'); // 接続情報、DBへのコネクションをキャッシュする変数 let atlasConnectionUri = null; let cachedDb = null; exports.handler = (event, context, callback) => { var uri = process.env['MONGODB_ATLAS_CLUSTER_URI']; // 接続情報が既にあればそれを使い、なければKMSで復号化しキャッシュの変数に代入します if (atlasConnectionUri != null) { processEvent(event, context, callback); } else { const kms = new AWS.KMS(); kms.decrypt({ CiphertextBlob: new Buffer(uri, 'base64') }, (err, data) => { if (err) { console.log('Decrypt error:', err); return callback(err); } atlasConnectionUri = data.Plaintext.toString('ascii'); processEvent(event, context, callback); }); } };
- イベント処理
function processEvent(event, context, callback) { // DBのコネクションを使いまわすためにEvent Loopが残っててもLambdaが終了する設定を行います context.callbackWaitsForEmptyEventLoop = false; if (cachedDb == null) { console.log('=> connecting to database'); // MongoDBへ接続し、成功すればキャッシュの変数に代入します MongoClient.connect(atlasConnectionUri, (err, db) => { if (err != null) { console.error("an error occurred in connecting to database", err); callback(null, JSON.stringify(err)); } console.log('=> database connected'); cachedDb = db; return createDoc(db, event, callback); }); } else { // DBのコネクションが存在すればそれを使いまわします console.log('Re Use database connection from cache'); createDoc(cachedDb, event, callback); } }
- ドキュメントの挿入
function createDoc(db, event, callback) { let coll = db.collection('logs'); // Bulk処理のためのオブジェクトを生成 let batch = coll.initializeUnorderedBulkOp(); try { event.Records.map((record) => { // ログはBase64でエンコードされたJSON形式の文字列で送られてくるのでパースします const payload = new Buffer(record.kinesis.data, 'base64').toString('ascii'); const obj = JSON.parse(payload); // HTTP Statusを見てタグを設定します if (!obj.status.startsWith('2')) { obj.tags = ['failure']; } // Bulk Insert するためのキューに登録します batch.insert(obj); }); // Bulk Insert で登録します batch.execute((err, result) => { if (err != null) { console.error('an error occured in batch insert'); callback(null, JSON.stringify(err)); } else { callback(null, "SUCCESS"); } }); // DBのコネクションを使いまわすため、db.closeは明示的に呼びません } catch (err) { console.error('error occured', err); } }
最後に
いかかでしたでしょうか?
多機能で柔軟なMongoDBをフルマネージドで使えるので、サーバーレスでDynamoDBでは難しいユースケースでは一考の価値があると思われます。
今回は接続制限などを特に行っていなく、パスワード認証のみでやりましたが、次回 VPC Lambda と VPC Peeringを使ったセキュアな接続もやってみたいと思います。